--- title: Data Transforms for Hyperspectral Datacubes keywords: fastai sidebar: home_sidebar summary: "Hyperspectral datacubes contain three axes (cross-track, along-track, and wavelength) and are stored in 3D numpy ndarrays. When using a pushbroom scanner, the datacube are filled gradually. The implementation here is based on a circular buffer and we provide additional methods to apply a pipeline of transforms which, for example, can be used for smile correction, and radiance conversion. " description: "Hyperspectral datacubes contain three axes (cross-track, along-track, and wavelength) and are stored in 3D numpy ndarrays. When using a pushbroom scanner, the datacube are filled gradually. The implementation here is based on a circular buffer and we provide additional methods to apply a pipeline of transforms which, for example, can be used for smile correction, and radiance conversion. " nb_path: "nbs/00_data.ipynb" ---
{% raw %}
{% endraw %}

{% include tip.html content='This module can be imported using from openhsi.data import *' %}

{% raw %}
{% endraw %}

Generic Circular Buffer on numpy.ndarrays

The base functionality is implemented on a generic circular buffer. The datatype dtype can be modified as desired but the default is set to store int32 digital numbers.

{% raw %}

class CircArrayBuffer[source]

CircArrayBuffer(size:tuple=(100, 100), axis:int=0, dtype:type=int32, show_func:Callable[ndarray, ForwardRef('plot')]=None)

Circular FIFO Buffer implementation on ndarrays. Each put/get is a (n-1)darray.
Type Default Details
size tuple (100, 100) Shape of n-dim circular buffer to preallocate
axis int 0 Which axis to traverse when filling the buffer
dtype type int32 Buffer numpy data type
show_func ndarray], ForwardRef('plot')] `` Custom plotting function if desired
{% endraw %} {% raw %}
{% endraw %} {% raw %}

CircArrayBuffer.put[source]

CircArrayBuffer.put(line:ndarray)

Writes a (n-1)darray into the buffer

CircArrayBuffer.get[source]

CircArrayBuffer.get()

Reads the oldest (n-1)darray from the buffer

CircArrayBuffer.show[source]

CircArrayBuffer.show()

Display the data
{% endraw %}

For example, we can write to a 1D array

{% raw %}
cib = CircArrayBuffer(size=(7,),axis=0)
for i in range(9):
    cib.put(i)
    cib.show()

for i in range(9):
    print(i,cib.get())
#(7) [0 0 0 0 0 0 0]
#(7) [0 1 0 0 0 0 0]
#(7) [0 1 2 0 0 0 0]
#(7) [0 1 2 3 0 0 0]
#(7) [0 1 2 3 4 0 0]
#(7) [0 1 2 3 4 5 0]
#(7) [0 1 2 3 4 5 6]
#(7) [7 1 2 3 4 5 6]
#(7) [7 8 2 3 4 5 6]
0 2
1 3
2 4
3 5
4 6
5 7
6 8
7 None
8 None
{% endraw %}

Or a 2D array

{% raw %}
import holoviews as hv
hv.extension("bokeh",logo=False)

plots_list = []

cib = CircArrayBuffer(size=(4,4),axis=0)
cib.put(1) # scalars are broadcasted to a 1D array
for i in range(5):
    cib.put(cib.get()+1)
    plots_list.append( cib.show().opts(colorbar=True,title=f"i={i}") )

hv.Layout(plots_list).cols(3)
{% endraw %}

Loading Camera Settings and Calibration Files

The OpenHSI camera has a settings dictionary which contains these fields:

  • camera_id is your camera name,
  • row_slice indicates which rows are illuminated and we crop out the rest,
  • resolution is the full pixel resolution given by the camera without cropping, and
  • fwhm_nm specifies the size of spectral bands in nanometers,
  • exposure_ms is the camera exposure time last used,
  • luminance is the reference luminance to convert digital numbers to luminance,
  • longitude is the longitude degrees east,
  • latitude is the latitude degrees north,
  • datetime_str is the UTC time at time of data collection,
  • altitude is the altitude above sea level (assuming target is at sea level) measured in km,
  • radiosonde_station_num is the station number from http://weather.uwyo.edu/upperair/sounding.html,
  • radiosonde_region is the region code from http://weather.uwyo.edu/upperair/sounding.html, and
  • sixs_path is the path to the 6SV executable.
  • binxy number of pixels to bin in (x,y) direction
  • win_offset offsets (x,y) from edge of detector for a selective readout window (used in combination with a win_resolution less than full detector size).
  • win_resolution size of area on detector to readout (width, height)
  • pixel_format format of pixels readout sensor, ie 8bit, 10bit, 12bit

The settings dictionary may also contain additional camera specific fields:

  • mac_addr is GigE camera mac address - used by Lucid Vision Sensors
  • serial_num serial number of detector - used by Ximea and FLIR Sensors

The pickle file is a dictionary with these fields:

  • camera_id is your camera name,
  • HgAr_pic is a picture of a mercury argon lamp's spectral lines for wavelength calibration,
  • flat_field_pic is a picture of a well lit for calculating the illuminated area,
  • smile_shifts is an array of pixel shifts needed to correct for smile error,
  • wavelengths_linear is an array of wavelengths after linear interpolation,
  • wavelengths is an array of wavelengths after cubic interpolation,
  • rad_ref is a 4D datacube with coordinates of cross-track, wavelength, exposure, and luminance,
  • sfit is the spline fit function from the integrating sphere calibration, and
  • rad_fit is the interpolated function of the expected radiance at sensor computed using 6SV.

These files are unique to each OpenHSI camera.

{% raw %}

class CameraProperties[source]

CameraProperties(json_path:str=None, pkl_path:str=None, print_settings:bool=False, **kwargs)

Save and load OpenHSI camera settings and calibration
Type Default Details
json_path str `` Path to settings file
pkl_path str `` Path to calibration file
print_settings bool False Print out settings file contents
kwargs No Content
{% endraw %} {% raw %}
{% endraw %} {% raw %}

CameraProperties.dump[source]

CameraProperties.dump(json_path:str=None, pkl_path:str=None)

Save the settings and calibration files
{% endraw %}

For example, the contents of CameraProperties consists of two dictionaries. To produce the files cam_settings.json and cam_calibration.pkl, follow the steps outlined in the calibration module.

{% raw %}
cam_prop = CameraProperties(pkl_path="assets/cam_calibration.pkl")
cam_prop
settings = 
{}

calibration = 
{'HgAr_pic': array([[0., 0., 0., ..., 0., 0., 0.],
       [0., 0., 0., ..., 0., 0., 0.],
       [0., 0., 0., ..., 0., 0., 0.],
       ...,
       [0., 0., 0., ..., 0., 0., 0.],
       [0., 0., 0., ..., 0., 0., 0.],
       [0., 0., 0., ..., 0., 0., 0.]]), 'smile_shifts': array([9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9,
       9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9,
       9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9,
       9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9,
       9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9,
       9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 8, 8,
       8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8,
       8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8,
       8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8,
       8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 7, 7, 7, 7,
       7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7,
       7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7,
       7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7,
       7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 6, 6, 6,
       6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6,
       6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6,
       6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6,
       6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6,
       6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5,
       5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5,
       5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5,
       5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5,
       5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 4, 4, 4, 4,
       4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4,
       4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4,
       4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4,
       4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4,
       4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4,
       4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4,
       4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 3, 3, 3, 3, 3, 3, 3, 3,
       3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3,
       3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3,
       3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3,
       3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3,
       3, 3, 3, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2,
       2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2,
       2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2,
       2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1,
       1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
       1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0], dtype=int16), 'wavelengths': array([418.37123297, 418.74477592, 419.11877271, ..., 893.95644796,
       894.09194015, 894.22658974]), 'wavelengths_linear': array([419.81906422, 420.25228866, 420.68551311, ..., 951.81868076,
       952.2519052 , 952.68512965]), 'flat_field_pic': array([[ 9,  7,  9, ...,  2,  3,  1],
       [ 8,  7,  8, ...,  3,  3,  4],
       [ 8,  7, 10, ...,  6,  7,  6],
       ...,
       [ 8,  6,  7, ...,  2,  3,  1],
       [ 3,  3,  3, ...,  1,  2,  2],
       [ 1,  1,  1, ...,  1,  1,  1]], dtype=uint8), 'rad_ref': <xarray.DataArray (variable: 1, cross_track: 905, wavelength_index: 1240, exposure: 2, luminance: 2)>
array([[[[[ 0,  2],
          [ 0,  5]],

         [[ 0,  2],
          [ 0,  5]],

         [[ 0,  2],
          [ 0,  5]],

         ...,

         [[ 0,  7],
          [ 0, 15]],

         [[ 0,  7],
          [ 0, 15]],

         [[ 0,  7],
          [ 0, 15]]],

...

        [[[ 0,  2],
          [ 0,  5]],

         [[ 0,  2],
          [ 0,  5]],

         [[ 0,  2],
          [ 0,  5]],

         ...,

         [[ 0,  7],
          [ 0, 14]],

         [[ 0,  7],
          [ 0, 14]],

         [[ 0,  7],
          [ 0, 14]]]]], dtype=int32)
Coordinates:
  * cross_track       (cross_track) int32 0 1 2 3 4 5 ... 900 901 902 903 904
  * wavelength_index  (wavelength_index) int32 0 1 2 3 4 ... 1236 1237 1238 1239
  * exposure          (exposure) int32 10 20
  * luminance         (luminance) int32 0 10000
  * variable          (variable) <U8 'datacube', 'spec_rad_ref_luminance': 52020, 'sfit': <scipy.interpolate.interpolate.interp1d object at 0x124cc77c0>, 'rad_fit': <scipy.interpolate.interpolate.interp1d object at 0x124cc7bd0>}
{% endraw %} {% raw %}
cam_prop.calibration["rad_ref"]
<xarray.DataArray (variable: 1, cross_track: 905, wavelength_index: 1240, exposure: 2, luminance: 2)>
array([[[[[ 0,  2],
          [ 0,  5]],

         [[ 0,  2],
          [ 0,  5]],

         [[ 0,  2],
          [ 0,  5]],

         ...,

         [[ 0,  7],
          [ 0, 15]],

         [[ 0,  7],
          [ 0, 15]],

         [[ 0,  7],
          [ 0, 15]]],

...

        [[[ 0,  2],
          [ 0,  5]],

         [[ 0,  2],
          [ 0,  5]],

         [[ 0,  2],
          [ 0,  5]],

         ...,

         [[ 0,  7],
          [ 0, 14]],

         [[ 0,  7],
          [ 0, 14]],

         [[ 0,  7],
          [ 0, 14]]]]], dtype=int32)
Coordinates:
  * cross_track       (cross_track) int32 0 1 2 3 4 5 ... 900 901 902 903 904
  * wavelength_index  (wavelength_index) int32 0 1 2 3 4 ... 1236 1237 1238 1239
  * exposure          (exposure) int32 10 20
  * luminance         (luminance) int32 0 10000
  * variable          (variable) <U8 'datacube'
{% endraw %}

Transforms

We can apply a number of transforms to the camera's raw data and these tranforms are used to modify the processing level during data collection. For example, we can perform a fast smile correction and wavelength binning during operation. With more processing, this is easily extended to obtain radiance and reflectance.

Some transforms require some setup which is done using CameraProperties.tfm_setup. This method also allows one to tack on an additional setup function with the argument more_setup which takes in any callable which can mutate the CameraProperties class.

{% raw %}

CameraProperties.tfm_setup[source]

CameraProperties.tfm_setup(more_setup:Optional[CameraProperties]=None, dtype:Union[int32, float32]=int32, lvl:int=0)

Setup for transforms
{% endraw %} {% raw %}
{% endraw %} {% raw %}

CameraProperties.crop[source]

CameraProperties.crop(x:ndarray)

Crops to illuminated area
{% endraw %} {% raw %}

CameraProperties.fast_smile[source]

CameraProperties.fast_smile(x:ndarray)

Apply the fast smile correction procedure
{% endraw %} {% raw %}
{% endraw %} {% raw %}

CameraProperties.fast_bin[source]

CameraProperties.fast_bin(x:ndarray)

Changes the view of the datacube so that everything that needs to be binned is in the last axis. The last axis is then binned.
{% endraw %} {% raw %}

CameraProperties.slow_bin[source]

CameraProperties.slow_bin(x:ndarray)

Bins spectral bands accounting for the slight nonlinearity in the index-wavelength map
{% endraw %} {% raw %}
{% endraw %} {% raw %}

CameraProperties.dn2rad[source]

CameraProperties.dn2rad(x:int32])

Converts digital numbers to radiance (uW/cm^2/sr/nm). Use after cropping to useable area.
{% endraw %} {% raw %}

CameraProperties.rad2ref_6SV[source]

CameraProperties.rad2ref_6SV(x:float32])

{% endraw %} {% raw %}
{% endraw %} {% raw %}

CameraProperties.set_processing_lvl[source]

CameraProperties.set_processing_lvl(lvl:int=-1, custom_tfms:List[ndarray]\]=*None`*)

Define the output `lvl` of the transform pipeline. Predefined recipies include:
-1: do not apply any transforms (default),
0 : raw digital numbers cropped to useable sensor area,
1 : crop + fast smile,
2 : crop + fast smile + fast binning,
3 : crop + fast smile + slow binning,
4 : crop + fast smile + fast binning + conversion to radiance in units of uW/cm^2/sr/nm,
5 : crop + fast smile + radiance + fast binning,
6 : crop + fast smile + fast binning + radiance + reflectance,
7 : crop + fast smile + radiance + slow binning,
8 : crop + fast smile + radiance + slow binning + reflectance.
{% endraw %} {% raw %}
{% endraw %}

You can add your own tranform by monkey patching the CameraProperties class.

@patch
def identity(self:CameraProperties, x:np.ndarray) -> np.ndarray:
    """The identity tranform"""
    return x

If you don't require any camera settings or calibration files, a valid transform can be any Callable that takens in a 2D np.ndarray and returns a 2D np.ndarray.

Pipeline for Composing Transforms

Depending on the level of processing that one wants to do real-time, a number of transforms need to be composed in sequential order. To make this easy to customise, you can use the pipeline method and pass in a raw camera frame and an ordered list of transforms.

To make the transforms pipeline easy to use and customise, you can use the CameraProperties.set_processing_lvl method.

{% raw %}

CameraProperties.pipeline[source]

CameraProperties.pipeline(x:ndarray)

Compose a list of transforms and apply to x.
{% endraw %} {% raw %}
{% endraw %} {% raw %}
# if wavelength calibration is changed, this needs to be updated

cam_prop = CameraProperties("assets/cam_settings.json","assets/cam_calibration.pkl")

cam_prop.set_processing_lvl(-1) # raw digital numbers
test_eq( (924,1240), np.shape( cam_prop.pipeline(cam_prop.calibration["HgAr_pic"])) )

cam_prop.set_processing_lvl(0) # cropped
test_eq( (905, 1240), np.shape( cam_prop.pipeline(cam_prop.calibration["HgAr_pic"])) )

cam_prop.set_processing_lvl(1) # fast smile corrected
test_eq( (905, 1231), np.shape( cam_prop.pipeline(cam_prop.calibration["HgAr_pic"])) )

cam_prop.set_processing_lvl(2) # fast binned
test_eq( (905, 136),  np.shape( cam_prop.pipeline(cam_prop.calibration["HgAr_pic"])) )

cam_prop.set_processing_lvl(4) # radiance
test_eq( (905, 136),  np.shape( cam_prop.pipeline(cam_prop.calibration["HgAr_pic"])) )

# cam_prop.set_processing_lvl(6) # reflectance
# test_eq( (452,108),  np.shape( cam_prop.pipeline(cam_prop.calibration["HgAr_pic"])) )

# cam_prop.set_processing_lvl(5) # radiance conversion moved earlier in pipeline
# test_eq( (452,108),  np.shape( cam_prop.pipeline(cam_prop.calibration["HgAr_pic"])) )

cam_prop = CameraProperties("assets/cam_settings.json","assets/cam_calibration.pkl")  
cam_prop.set_processing_lvl(3) # slow binned
test_eq( (905, 118),  np.shape( cam_prop.pipeline(cam_prop.calibration["HgAr_pic"])) )
/Users/eway/.pyenv/versions/3.8.3/lib/python3.8/site-packages/xarray/core/indexes.py:234: FutureWarning: Passing method to Int64Index.get_loc is deprecated and will raise in a future version. Use index.get_indexer([item], method=...) instead.
  indexer = self.index.get_loc(
{% endraw %}

Buffer for Data Collection

DataCube takes a line with coordinates of wavelength (x-axis) against cross-track (y-axis), and stores the smile corrected version in its CircArrayBuffer.

For facilitate near real-timing processing, a fast smile correction procedure is used. An option to use a fast binning procedure is also available. When using these two procedures, the overhead is roughly 2 ms on a Jetson board.

Instead of preallocating another buffer for another data collect, one can use the circular nature of the DataCube and use the internal buffer again without modification - just use DataCube.put like normal.

Storage Allocation

All data buffers are preallocated so it's no secret that hyperspectral datacubes are memory hungry. For reference:

along-track pixels wavelength binning RAM needed time to collect at 10 ms exposure time to save to SSD
4096 4 nm ≈ 800 MB ≈ 55 s ≈ 3 s
1024 no binning ≈ 4 GB ≈ 14 s ≈ 15 s

In reality, it is very difficult to work with raw data without binning due to massive RAM usage and extended times to save the NetCDF file to disk which hinders making real-time analysis. The frame rate (at 10 s exposure) with binning drops the frame rate to from 90 fps to 75 fps. In our experimentation, using a SSD mounted into a M.2 slot on a Jetson board provided the fastest experience. When using other development boards such as a Raspberry Pi 4, the USB 3.0 port is recommended over the USB 2.0 port.

{% raw %}

class DateTimeBuffer[source]

DateTimeBuffer(n:int=16)

Records timestamps in UTC time.
{% endraw %} {% raw %}
{% endraw %} {% raw %}

DateTimeBuffer.update[source]

DateTimeBuffer.update()

Stores current UTC time in an internal buffer when this method is called.
{% endraw %} {% raw %}
timestamps = DateTimeBuffer(n=8)
for i in range(8):
    timestamps.update()
    
timestamps.data
array([datetime.datetime(2022, 3, 18, 11, 48, 6, 488249, tzinfo=datetime.timezone.utc),
       datetime.datetime(2022, 3, 18, 11, 48, 6, 488283, tzinfo=datetime.timezone.utc),
       datetime.datetime(2022, 3, 18, 11, 48, 6, 488291, tzinfo=datetime.timezone.utc),
       datetime.datetime(2022, 3, 18, 11, 48, 6, 488298, tzinfo=datetime.timezone.utc),
       datetime.datetime(2022, 3, 18, 11, 48, 6, 488305, tzinfo=datetime.timezone.utc),
       datetime.datetime(2022, 3, 18, 11, 48, 6, 488311, tzinfo=datetime.timezone.utc),
       datetime.datetime(2022, 3, 18, 11, 48, 6, 488318, tzinfo=datetime.timezone.utc),
       datetime.datetime(2022, 3, 18, 11, 48, 6, 488324, tzinfo=datetime.timezone.utc)],
      dtype=object)
{% endraw %} {% raw %}

class DataCube[source]

DataCube(n_lines:int=16, processing_lvl:int=-1, warn_mem_use:bool=True, preserve_raw:bool=False, json_path:str=None, pkl_path:str=None, print_settings:bool=False) :: CameraProperties

Facilitates the collection, viewing, and saving of hyperspectral datacubes.
Type Default Details
n_lines int 16 How many along-track pixels desired
processing_lvl int -1 Desired real time processing level
warn_mem_use bool True Give a warning if trying to allocate too much memory (> 80% of available RAM)
preserve_raw bool False Not implemented
json_path str `` No Content
pkl_path str `` No Content
print_settings bool False No Content
{% endraw %} {% raw %}
{% endraw %}

load_nc expects the datacube to have coordinates (wavelength, cross-track, along-track). This is a format that can be viewed in ENVI and QGIS. If you have a datacube with coordinates (cross-track, along-track, wavelength), then set the parameter old_style=True.

The plot_lib argument hs Choose matplotlib if you want to use Choose bokeh if you want to compose plots together and use interactive tools.

{% raw %}
@patch
def put(self:DataCube, x:np.ndarray):
    """Applies the composed tranforms and writes the 2D array into the data cube. Stores a timestamp for each push."""
    self.timestamps.update()
    self.dc.put( self.pipeline(x) )

@patch
def save(self:DataCube, 
         save_dir:str,                 # Path to folder where all datacubes will be saved at
         preconfig_meta_path:str=None, # Path to a .json file that includes metadata fields to be saved inside datacube
         prefix:str="",                # Prepend a custom prefix to your file name
         suffix:str="",                # Append a custom suffix to your file name
        ):
    """Saves to a NetCDF file (and RGB representation) to directory dir_path in folder given by date with file name given by UTC time."""
    if preconfig_meta_path is not None:
        with open(preconfig_meta_path) as json_file:
            attrs = json.load(json_file)
    else: attrs = {}
    if hasattr(self, "ds_metadata"): attrs = self.ds_metadata

    self.directory = Path(f"{save_dir}/{self.timestamps[0].strftime('%Y_%m_%d')}/").mkdir(parents=True, exist_ok=True)
    self.directory = f"{save_dir}/{self.timestamps[0].strftime('%Y_%m_%d')}"

    if hasattr(self, "binned_wavelengths"):
        wavelengths = self.binned_wavelengths if self.proc_lvl not in (3,7,8) else self.λs
    else:
        wavelengths = np.arange(self.dc.data.shape[2])

    if hasattr(self,"cam_temperatures"):
        self.coords = dict(wavelength=(["wavelength"],wavelengths),
                           x=(["x"],np.arange(self.dc.data.shape[0])),
                           y=(["y"],np.arange(self.dc.data.shape[1])),
                           time=(["time"],self.timestamps.data.astype(np.datetime64)),
                           temperature=(["temperature"],self.cam_temperatures.data))
    else:
        self.coords = dict(wavelength=(["wavelength"],wavelengths),
                           x=(["x"],np.arange(self.dc.data.shape[0])),
                           y=(["y"],np.arange(self.dc.data.shape[1])),
                           time=(["time"],self.timestamps.data.astype(np.datetime64)))

    # time coordinates can only be saved in np.datetime64 format
    self.nc = xr.Dataset(data_vars=dict(datacube=(["wavelength","x","y"],np.moveaxis(self.dc.data, -1, 0) )),
                         coords=self.coords, attrs=attrs)  

    """provide metadata to NetCDF coordinates"""
    self.nc.x.attrs["long_name"]   = "cross-track"
    self.nc.x.attrs["units"]       = "pixels"
    self.nc.x.attrs["description"] = "cross-track spatial coordinates"
    self.nc.y.attrs["long_name"]   = "along-track"
    self.nc.y.attrs["units"]       = "pixels"
    self.nc.y.attrs["description"] = "along-track spatial coordinates"
    self.nc.time.attrs["long_name"]   = "along-track"
    self.nc.time.attrs["description"] = "along-track spatial coordinates"
    self.nc.wavelength.attrs["long_name"]   = "wavelength_nm"
    self.nc.wavelength.attrs["units"]       = "nanometers"
    self.nc.wavelength.attrs["description"] = "wavelength in nanometers."
    if hasattr(self,"cam_temperatures"):
        self.nc.temperature.attrs["long_name"] = "camera temperature"
        self.nc.temperature.attrs["units"] = "degrees Celsius"
        self.nc.temperature.attrs["description"] = "temperature of sensor at time of image capture"

    self.nc.datacube.attrs["long_name"]   = "hyperspectral datacube"
    self.nc.datacube.attrs["units"]       = "digital number"
    if self.proc_lvl in (4,5,7): self.nc.datacube.attrs["units"] = "uW/cm^2/sr/nm"
    elif self.proc_lvl in (6,8): self.nc.datacube.attrs["units"] = "percentage reflectance"
    self.nc.datacube.attrs["description"] = "hyperspectral datacube"

    self.nc.to_netcdf(f"{self.directory}/{prefix}{self.timestamps[0].strftime('%Y_%m_%d-%H_%M_%S')}{suffix}.nc")

    fig = self.show("matplotlib",hist_eq=True,quick_imshow=True)
    fig.savefig(f"{self.directory}/{prefix}{self.timestamps[0].strftime('%Y_%m_%d-%H_%M_%S')}{suffix}.png",
               bbox_inches='tight', pad_inches=0)

@patch
def load_nc(self:DataCube, 
            nc_path:str,            # Path to a NetCDF4 file
            old_style:bool = False, # Only for backwards compatibility for datacubes created before first release
           ):
    """Lazy load a NetCDF datacube into the DataCube buffer."""
    with xr.open_dataset(nc_path) as ds:
        if old_style: # cross-track, along-track, wavelength
            self.dc      = CircArrayBuffer(size=ds.datacube.shape, axis=1, dtype=type(np.array(ds.datacube[0,0])[0]))
            self.dc.data = np.array(ds.datacube)
        else: # wavelength, cross-track, along-track -> convert to old_style (datacube inserts do not need transpose)
            shape = (*ds.datacube.shape[1:],ds.datacube.shape[0])
            self.dc      = CircArrayBuffer(size=shape, axis=1, dtype=type(np.array(ds.datacube[0,0])[0]))
            self.dc.data = np.moveaxis(np.array(ds.datacube), 0, -1)
        print(f"Allocated {4*reduce(lambda x,y: x*y, ds.datacube.shape)/2**20:.02f} MB of RAM.")

        self.ds_timestamps = ds.time.to_numpy() # type is np.datetime64. convert to datetime.datetime
        unix_epoch = np.datetime64(0, 's')
        one_second = np.timedelta64(1, 's')
        seconds_since_epoch = (self.ds_timestamps - unix_epoch) / one_second
        self.ds_timestamps = np.array([datetime.utcfromtimestamp(s) for s in seconds_since_epoch])
        self.timestamps.data = self.ds_timestamps
        self.ds_metadata = ds.attrs

        if hasattr(ds,"temperature"):
            self.ds_temperatures = ds.temperature.to_numpy()
            self.cam_temperatures = CircArrayBuffer(size=self.ds_temperatures.shape,dtype=np.float32)
            self.cam_temperatures.data = self.ds_temperatures
        self.binned_wavelengths = np.array(ds.wavelength)
        self.dc.slots_left      = 0 # indicate that the data buffer is full

@patch
def show(self:DataCube, 
         plot_lib:str = "bokeh", # Plotting backend. This can be 'bokeh' or 'matplotlib'
         red_nm:float = 640.,    # Wavelength in nm to use as the red
         green_nm:float = 550.,  # Wavelength in nm to use as the green
         blue_nm:float = 470.,   # Wavelength in nm to use as the blue
         robust:bool = False,    # Choose to plot using the 2-98% percentile. Robust to outliers
         hist_eq:bool = False,   # Choose to plot using histogram equilisation
         quick_imshow:bool = False, # Used to skip holoviews and use matplotlib for a static plot
         **plot_kwargs,          # Any other plotting options to be used in your plotting backend
        ) -> "bokeh or matplotlib plot":
    """Generate an RGB image from chosen RGB wavelengths with histogram equalisation or percentile options. 
    The plotting backend can be specified by `plot_lib` and can be "bokeh" or "matplotlib". 
    Further customise your plot with `**plot_kwargs`. `quick_imshow` is used for saving figures quickly
    but cannot be used to make interactive plots. """

    rgb = np.zeros( (*self.dc.data.shape[:2],3), dtype=np.float32)
    if hasattr(self, "binned_wavelengths"):
        rgb[...,0] = self.dc.data[:,:,np.argmin(np.abs(self.binned_wavelengths-red_nm))]
        rgb[...,1] = self.dc.data[:,:,np.argmin(np.abs(self.binned_wavelengths-green_nm))]
        rgb[...,2] = self.dc.data[:,:,np.argmin(np.abs(self.binned_wavelengths-blue_nm))]
    else:
        rgb[...,0] = self.dc.data[:,:,int(self.dc.data.shape[2] / 2)]
        rgb[...,1] = self.dc.data[:,:,int(self.dc.data.shape[2] / 2)]
        rgb[...,2] = self.dc.data[:,:,int(self.dc.data.shape[2] / 2)]

    if robust and not hist_eq: # scale everything to the 2% and 98% percentile
        vmax = np.nanpercentile(rgb, 98)
        vmin = np.nanpercentile(rgb, 2)
        rgb = ((rgb.astype("f8") - vmin) / (vmax - vmin)).astype("f4")
        rgb = np.minimum(np.maximum(rgb, 0), 1)
    elif hist_eq and not robust:
        img_hist, bins = np.histogram(rgb.flatten(), 256, density=True)
        cdf = img_hist.cumsum() # cumulative distribution function
        cdf = 1. * cdf / cdf[-1] # normalize
        img_eq = np.interp(rgb.flatten(), bins[:-1], cdf) # find new pixel values from linear interpolation of cdf
        rgb = img_eq.reshape(rgb.shape)
    elif robust and hist_eq:
        warnings.warn("Cannot mix robust with histogram equalisation. No RGB adjustments will be made.",stacklevel=2)
        rgb /= np.max(rgb)
    else:
        rgb /= np.max(rgb)

    if quick_imshow:
        fig, ax = plt.subplots(figsize=(12,3))
        ax.imshow(rgb,aspect="equal"); ax.set_xlabel("along-track"); ax.set_ylabel("cross-track")
        return fig

    import holoviews as hv
    hv.extension(plot_lib,logo=False)
    rgb_hv = hv.RGB((np.arange(rgb.shape[1]),np.arange(rgb.shape[0]),
                     rgb[:,:,0],rgb[:,:,1],rgb[:,:,2]))

    if plot_lib == "bokeh":
        return rgb_hv.opts(width=1000,height=250,frame_height=int(rgb.shape[0]//3)).opts(**plot_kwargs).opts(
            xlabel="along-track",ylabel="cross-track",invert_yaxis=True)
    else: # plot_lib == "matplotlib"
        return rgb_hv.opts(fig_inches=22).opts(**plot_kwargs).opts(
            xlabel="along-track",ylabel="cross-track",invert_yaxis=True)
{% endraw %} {% raw %}
n = 256

dc = DataCube(n_lines=n,processing_lvl=2,json_path="assets/cam_settings.json",pkl_path="assets/cam_calibration.pkl")

for i in range(200):
    dc.put( np.random.randint(0,255,dc.settings["resolution"]) )

dc.show("bokeh")
Allocated 120.20 MB of RAM.
{% endraw %} {% raw %}
dc.save("assets")
<ipython-input-32-660ecb0d4ed6>:67: DeprecationWarning: parsing timezone aware datetimes is deprecated; this will raise an error in the future
  time=(["time"],self.timestamps.data.astype(np.datetime64)))
{% endraw %}